package com.offcn.bigdata.spark.p3

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

class MyAccumulator extends AccumulatorV2[String, Map[String, Int]] {
    private var map = mutable.Map[String, Int]()

    override def isZero: Boolean = map.isEmpty

    override def copy(): AccumulatorV2[String, Map[String, Int]] = {
        val newAcc = new MyAccumulator
        newAcc.map = this.map
        newAcc
    }
    //分区内的累加
    override def add(word: String): Unit = {
//        val option = map.get(word)
//        if(option.isDefined) {
//            map.put(word, option.get + 1)
//        } else {
//            map.put(word, 1)
//        }
        map.put(word, map.getOrElse(word, 0) + 1)
    }
    //分区间的合并
    override def merge(other: AccumulatorV2[String, Map[String, Int]]): Unit = {
        val otherMap = other.value
        for((word, count) <- otherMap) {
            val option = map.get(word)
            if(option.isDefined) {
                map.put(word, option.get + count)
            } else {
                map.put(word, count)
            }
        }
    }

    override def reset(): Unit = map.clear()

    override def value: Map[String, Int] = map.toMap
}
